From f19c38821278a8e24ede404a954d32be4ef3654e Mon Sep 17 00:00:00 2001 From: "mjw@wray-m-3.hpl.hp.com" Date: Wed, 21 Jul 2004 11:22:39 +0000 Subject: [PATCH] bitkeeper revision 1.1108.1.4 (40fe51ffLxtDXSQ_TOJZQcpn_HcB3g) Rework xend client to support synchronous and asynchronous connections. --- tools/python/xen/xend/XendClient.py | 644 +++++++++++++++++++--------- 1 file changed, 431 insertions(+), 213 deletions(-) diff --git a/tools/python/xen/xend/XendClient.py b/tools/python/xen/xend/XendClient.py index 538f84c3a9..ea7840ed07 100644 --- a/tools/python/xen/xend/XendClient.py +++ b/tools/python/xen/xend/XendClient.py @@ -1,6 +1,8 @@ +#!/usr/bin/env python # Copyright (C) 2004 Mike Wray """Client API for the HTTP interface on xend. Callable as a script - see main(). +Supports synchronous or asynchronous connection to xend. This API is the 'control-plane' for xend. The 'data-plane' is done separately. For example, consoles @@ -11,9 +13,12 @@ import sys import httplib import types from StringIO import StringIO -import urlparse + from twisted.protocols import http +from twisted.internet.protocol import ClientCreator +from twisted.internet.defer import Deferred +from twisted.internet import reactor from encode import * import sxp @@ -22,40 +27,22 @@ import PrettyPrint DEBUG = 0 class XendError(RuntimeError): - pass - -class Foo(httplib.HTTPResponse): - - def begin(self): - fin = self.fp - while(1): - buf = fin.readline() - print "***", buf - if buf == '': - print - sys.exit() - - -def sxprio(sxpr): - """Convert an sxpr to a string. + """Error class for 'expected errors' when talking to xend. """ - io = StringIO() - sxp.show(sxpr, out=io) - print >> io - io.seek(0) - return io + pass def fileof(val): - """Converter for passing configs. + """Converter for passing configs or other 'large' data. Handles lists, files directly. Assumes a string is a file name and passes its contents. """ if isinstance(val, types.ListType): - return sxprio(val) + return sxp.to_string(val) if isinstance(val, types.StringType): return file(val) if hasattr(val, 'readlines'): return val + raise XendError('cannot convert value') # todo: need to sort of what urls/paths are using for objects. # e.g. for domains at the moment return '0'. @@ -66,98 +53,251 @@ def fileof(val): # maps /xend/domain/0 to http://wray-m-3.hpl.hp.com:8000/xend/domain/0 # And should accept urls for ids? -def urljoin(location, root, prefix='', rest=''): - prefix = str(prefix) - rest = str(rest) - base = 'http://' + location + root + prefix - url = urlparse.urljoin(base, rest) - return url +class URL: + """A URL. + """ -def nodeurl(location, root, id=''): - return urljoin(location, root, 'node/', id) + def __init__(self, proto='http', host='localhost', port=None, path='', query=None, frag=None): + self.proto = proto + self.host = host + if port: port = int(port) + self.port = port + self.path = path + self.query = query + self.frag = frag + + def url(self): + """Get the full URL string including protocol, location and the full path. + """ + return self.proto + '://' + self.location() + self.fullpath() -def domainurl(location, root, id=''): - return urljoin(location, root, 'domain/', id) + def location(self): + """Get the location part of the URL, including host and port, if present. + """ + if self.port: + return self.host + ':' + str(self.port) + else: + return self.host -def consoleurl(location, root, id=''): - return urljoin(location, root, 'console/', id) + def fullpath(self): + """Get the full path part of the URL, including query and fragment if present. + """ + u = [ self.path ] + if self.query: + u.append('?') + u.append(self.query) + if self.frag: + u.append('#') + u.append(self.frag) + return ''.join(u) + + def relative(self, path='', query=None, frag=None): + """Create a URL relative to this one. + """ + return URL(proto=self.proto, + host=self.host, + port=self.port, + path=self.path + path, + query=query, + frag=frag) + +class XendRequest: + """A request to xend. + """ -def deviceurl(location, root, id=''): - return urljoin(location, root, 'device/', id) + def __init__(self, url, method, args): + """Create a request. Sets up the headers, argument data, and the + url. -def vneturl(location, root, id=''): - return urljoin(location, root, 'vnet/', id) + @param url: the url to request + @param method: request method, GET or POST + @param args: dict containing request args, if any + """ + if url.proto != 'http': + raise ValueError('Invalid protocol: ' + url.proto) + (hdr, data) = encode_data(args) + if args and method == 'GET': + url.query = data + data = None + if method == "POST" and url.path.endswith('/'): + url.path = url.path[:-1] + + self.headers = hdr + self.data = data + self.url = url + self.method = method + +class XendClientProtocol: + """Abstract class for xend clients. + """ -def eventurl(location, root, id=''): - return urljoin(location, root, 'event/', id) + def xendRequest(self, url, method, args=None): + """Make a request to xend. + Implement in a subclass. + + @param url: xend request url + @param method: http method: POST or GET + @param args: request arguments (dict) + """ + raise NotImplementedError() + + def xendGet(self, url, args=None): + """Make a xend request using HTTP GET. + Requests using GET are usually 'safe' and may be repeated without + nasty side-effects. + + @param url: xend request url + @param data: request arguments (dict) + """ + return self.xendRequest(url, "GET", args) + + def xendPost(self, url, args): + """Make a xend request using HTTP POST. + Requests using POST potentially cause side-effects, and should + not be repeated unless you really want to repeat the side + effect. + + @param url: xend request url + @param args: request arguments (dict) + """ + return self.xendRequest(url, "POST", args) -def dmesgurl(location, root, id=''): - return urljoin(location, root, 'node/dmesg/', id) + def handleStatus(self, version, status, message): + """Handle the status returned from the request. + """ + status = int(status) + if status in [ http.NO_CONTENT ]: + return None + if status not in [ http.OK, http.CREATED, http.ACCEPTED ]: + return self.handleException(XendError(message)) + return 'ok' + + def handleResponse(self, data): + """Handle the data returned in response to the request. + """ + if data is None: return None + try: + pin = sxp.Parser() + pin.input(data); + pin.input_eof() + val = pin.get_val() + except sxp.ParseError, err: + return self.handleException(err) + if isinstance(val, types.ListType) and sxp.name(val) == 'xend.err': + err = XendError(val[1]) + return self.handleException(err) + return val -def xend_request(url, method, data=None): - """Make a request to xend. + def handleException(self, err): + """Handle an exception during the request. + May be overridden in a subclass. + """ + raise err - url xend request url - method http method: POST or GET - data request argument data (dict) +class SynchXendClientProtocol(XendClientProtocol): + """A synchronous xend client. This will make a request, wait for + the reply and return the result. """ - urlinfo = urlparse.urlparse(url) - (uproto, ulocation, upath, uparam, uquery, ufrag) = urlinfo - if DEBUG: print url, urlinfo - if uproto != 'http': - raise StandardError('Invalid protocol: ' + uproto) - if DEBUG: print '>xend_request', ulocation, upath, method, data - (hdr, args) = encode_data(data) - if data and method == 'GET': - upath += '?' + args - args = None - if method == "POST" and upath.endswith('/'): - upath = upath[:-1] - if DEBUG: print "ulocation=", ulocation, "upath=", upath, "args=", args - #hdr['User-Agent'] = 'Mozilla' - #hdr['Accept'] = 'text/html,text/plain' - conn = httplib.HTTPConnection(ulocation) - #conn.response_class = Foo - if DEBUG: conn.set_debuglevel(1) - conn.request(method, upath, args, hdr) - resp = conn.getresponse() - if DEBUG: print resp.status, resp.reason - if DEBUG: print resp.msg.headers - if resp.status in [ http.NO_CONTENT ]: - return None - if resp.status not in [ http.OK, http.CREATED, http.ACCEPTED ]: - raise XendError(resp.reason) - pin = sxp.Parser() - data = resp.read() - if DEBUG: print "***data" , data - if DEBUG: print "***" - pin.input(data); - pin.input_eof() - conn.close() - val = pin.get_val() - #if isinstance(val, types.ListType) and sxp.name(val) == 'val': - # val = val[1] - if isinstance(val, types.ListType) and sxp.name(val) == 'xend.err': - raise XendError(val[1]) - if DEBUG: print '**val='; sxp.show(val); print - return val - -def xend_get(url, args=None): - """Make a xend request using GET. - Requests using GET are 'safe' and may be repeated without - nasty side-effects. - """ - return xend_request(url, "GET", args) + + def xendRequest(self, url, method, args=None): + """Make a request to xend. + + @param url: xend request url + @param method: http method: POST or GET + @param args: request arguments (dict) + """ + self.request = XendRequest(url, method, args) + conn = httplib.HTTPConnection(url.location()) + if DEBUG: conn.set_debuglevel(1) + conn.request(method, url.fullpath(), self.request.data, self.request.headers) + resp = conn.getresponse() + val = self.handleStatus(resp.version, resp.status, resp.reason) + if val is None: + data = None + else: + data = resp.read() + conn.close() + val = self.handleResponse(data) + return val -def xend_call(url, data): - """Make xend request using POST. - Requests using POST potentially cause side-effects and should - not be repeated unless it really is wanted to do the side - effect again. +class AsynchXendClient(http.HTTPClient): + """A subclass of twisted's HTTPClient to deal with a connection to xend. + Makes the request when connected, and delegates handling responses etc. + to its protocol (usually an AsynchXendClientProtocol instance). + """ + def __init__(self, protocol, request): + self.protocol = protocol + self.request = request + + def connectionMade(self): + request = self.request + url = self.request.url + self.sendCommand(request.method, url.fullpath()) + self.sendHeader('Host', url.location()) + for (k, v) in request.headers.items(): + self.sendHeader(k, v) + self.endHeaders() + if request.data: + self.transport.write(request.data) + + def handleStatus(self, version, status, message): + return self.protocol.handleStatus(version, status, message) + + def handleResponse(self, data): + return self.protocol.handleResponse(data) + +class AsynchXendClientProtocol(XendClientProtocol): + """An asynchronous xend client. Uses twisted to connect to xend + and make the request. It does not block waiting for the result, + but sets up a deferred that is called when the result becomes available. + + Uses AsynchXendClient to manage the connection. """ - return xend_request(url, "POST", data) + def __init__(self): + self.err = None + + def xendRequest(self, url, method, args=None): + """Make a request to xend. The returned deferred is called when + the result is available. + + @param url: xend request url + @param method: http method: POST or GET + @param args: request arguments (dict) + @return: deferred + """ + request = XendRequest(url, method, args) + self.deferred = Deferred() + clientCreator = ClientCreator(reactor, AsynchXendClient, self, request) + clientCreator.connectTCP(url.host, url.port) + return self.deferred + + def callErrback(self, err): + if not self.deferred.called: + self.err = err + self.deferred.errback(err) + return err + + def callCallback(self, val): + if not self.deferred.called: + self.deferred.callback(val) + return val + + def handleException(self, err): + return self.callErrback(err) + + def handleResponse(self, data): + if self.err: return self.err + val = XendClientProtocol.handleResponse(self, data) + if isinstance(val, Exception): + self.callErrback(val) + else: + self.callCallback(val) + return val + class Xend: + """Client interface to Xend. + """ """Default location of the xend server.""" SRV_DEFAULT = "localhost:8000" @@ -165,199 +305,262 @@ class Xend: """Default path to the xend root on the server.""" ROOT_DEFAULT = "/xend/" - def __init__(self, srv=None, root=None): + def __init__(self, client=None, srv=None, root=None): + """Create a xend client interface. + If the client protocol is not specified, the default + is to use a synchronous protocol. + + @param client: client protocol to use + @param srv: server host, and optional port (format host:port) + @param root: xend root path on the server + """ + if client is None: + client = SynchXendClientProtocol() + self.client = client self.bind(srv, root) def bind(self, srv=None, root=None): """Bind to a given server. - srv server location (host:port) - root server xend root path + @param srv: server location (host:port) + @param root: xend root path on the server """ if srv is None: srv = self.SRV_DEFAULT if root is None: root = self.ROOT_DEFAULT if not root.endswith('/'): root += '/' - self.location = srv - self.root = root + (host, port) = srv.split(':', 1) + self.url = URL(host=host, port=port, path=root) + + def xendGet(self, url, args=None): + return self.client.xendGet(url, args) + + def xendPost(self, url, data): + return self.client.xendPost(url, data) def nodeurl(self, id=''): - return nodeurl(self.location, self.root, id) + return self.url.relative('node/' + id) def domainurl(self, id=''): - return domainurl(self.location, self.root, id) + return self.url.relative('domain/' + id) def consoleurl(self, id=''): - return consoleurl(self.location, self.root, id) + return self.url.relative('console/' + id) def deviceurl(self, id=''): - return deviceurl(self.location, self.root, id) + return self.url.relative('device/' + id) def vneturl(self, id=''): - return vneturl(self.location, self.root, id) + return self.url.relative('vnet/' + id) def eventurl(self, id=''): - return eventurl(self.location, self.root, id) + return self.url.relative('event/' + id) def dmesgurl(self, id=''): - return dmesgurl(self.location, self.root, id) + return self.url.relative('node/dmesg/' + id) def xend(self): - return xend_get(urljoin(self.location, self.root)) + return self.xendGet(self.url) def xend_node(self): - return xend_get(self.nodeurl()) + return self.xendGet(self.nodeurl()) def xend_node_cpu_rrobin_slice_set(self, slice): - return xend_call(self.nodeurl(), - {'op' : 'cpu_rrobin_slice_set', - 'slice' : slice }) + return self.xendPost(self.nodeurl(), + {'op' : 'cpu_rrobin_slice_set', + 'slice' : slice }) def xend_node_cpu_bvt_slice_set(self, ctx_allow): - return xend_call(self.nodeurl(), - {'op' : 'cpu_bvt_slice_set', - 'ctx_allow' : ctx_allow }) + return self.xendPost(self.nodeurl(), + {'op' : 'cpu_bvt_slice_set', + 'ctx_allow' : ctx_allow }) def xend_node_cpu_fbvt_slice_set(self, ctx_allow): - return xend_call(self.nodeurl(), - {'op' : 'cpu_fbvt_slice_set', - 'ctx_allow' : ctx_allow }) + return self.xendPost(self.nodeurl(), + {'op' : 'cpu_fbvt_slice_set', + 'ctx_allow' : ctx_allow }) def xend_domains(self): - return xend_get(self.domainurl()) + return self.xendGet(self.domainurl()) def xend_domain_create(self, conf): - return xend_call(self.domainurl(), - {'op' : 'create', - 'config' : fileof(conf) }) + return self.xendPost(self.domainurl(), + {'op' : 'create', + 'config' : fileof(conf) }) def xend_domain_restore(self, filename): - return xend_call(self.domainurl(), - {'op' : 'restore', - 'file' : filename }) + return self.xendPost(self.domainurl(), + {'op' : 'restore', + 'file' : filename }) - def xend_domain_configure(self, id, config): - return xend_call(self.domainurl(id), - {'op' : 'configure', - 'config' : fileof(conf) }) + def xend_domain_configure(self, id, conf): + return self.xendPost(self.domainurl(id), + {'op' : 'configure', + 'config' : fileof(conf) }) def xend_domain(self, id): - return xend_get(self.domainurl(id)) + return self.xendGet(self.domainurl(id)) def xend_domain_unpause(self, id): - return xend_call(self.domainurl(id), - {'op' : 'unpause' }) + return self.xendPost(self.domainurl(id), + {'op' : 'unpause' }) def xend_domain_pause(self, id): - return xend_call(self.domainurl(id), - {'op' : 'pause' }) + return self.xendPost(self.domainurl(id), + {'op' : 'pause' }) def xend_domain_shutdown(self, id, reason): - return xend_call(self.domainurl(id), - {'op' : 'shutdown', - 'reason' : reason }) + return self.xendPost(self.domainurl(id), + {'op' : 'shutdown', + 'reason' : reason }) def xend_domain_destroy(self, id, reason): - return xend_call(self.domainurl(id), - {'op' : 'destroy', - 'reason' : reason }) + return self.xendPost(self.domainurl(id), + {'op' : 'destroy', + 'reason' : reason }) def xend_domain_save(self, id, filename): - return xend_call(self.domainurl(id), - {'op' : 'save', - 'file' : filename }) + return self.xendPost(self.domainurl(id), + {'op' : 'save', + 'file' : filename }) def xend_domain_migrate(self, id, dst): - return xend_call(self.domainurl(id), - {'op' : 'migrate', - 'destination': dst }) + return self.xendPost(self.domainurl(id), + {'op' : 'migrate', + 'destination': dst }) def xend_domain_pincpu(self, id, cpu): - return xend_call(self.domainurl(id), - {'op' : 'pincpu', - 'cpu' : cpu }) + return self.xendPost(self.domainurl(id), + {'op' : 'pincpu', + 'cpu' : cpu }) def xend_domain_cpu_bvt_set(self, id, mcuadv, warp, warpl, warpu): - return xend_call(self.domainurl(id), - {'op' : 'cpu_bvt_set', - 'mcuadv' : mcuadv, - 'warp' : warp, - 'warpl' : warpl, - 'warpu' : warpu }) + return self.xendPost(self.domainurl(id), + {'op' : 'cpu_bvt_set', + 'mcuadv' : mcuadv, + 'warp' : warp, + 'warpl' : warpl, + 'warpu' : warpu }) def xend_domain_cpu_fbvt_set(self, id, mcuadv, warp, warpl, warpu): - return xend_call(self.domainurl(id), - {'op' : 'cpu_fbvt_set', - 'mcuadv' : mcuadv, - 'warp' : warp, - 'warpl' : warpl, - 'warpu' : warpu }) - + return self.xendPost(self.domainurl(id), + {'op' : 'cpu_fbvt_set', + 'mcuadv' : mcuadv, + 'warp' : warp, + 'warpl' : warpl, + 'warpu' : warpu }) + def xend_domain_cpu_atropos_set(self, id, period, slice, latency, xtratime): - return xend_call(self.domainurl(id), - {'op' : 'cpu_atropos_set', - 'period' : period, - 'slice' : slice, - 'latency' : latency, - 'xtratime': xtratime }) - + return self.xendPost(self.domainurl(id), + {'op' : 'cpu_atropos_set', + 'period' : period, + 'slice' : slice, + 'latency' : latency, + 'xtratime': xtratime }) + def xend_domain_vifs(self, id): - return xend_get(self.domainurl(id), - { 'op' : 'vifs' }) + return self.xendGet(self.domainurl(id), + { 'op' : 'vifs' }) def xend_domain_vif(self, id, vif): - return xend_get(self.domainurl(id), - { 'op' : 'vif', - 'vif' : vif }) + return self.xendGet(self.domainurl(id), + { 'op' : 'vif', + 'vif' : vif }) def xend_domain_vbds(self, id): - return xend_get(self.domainurl(id), - {'op' : 'vbds'}) - + return self.xendGet(self.domainurl(id), + {'op' : 'vbds'}) + def xend_domain_vbd(self, id, vbd): - return xend_get(self.domainurl(id), - {'op' : 'vbd', - 'vbd' : vbd }) + return self.xendGet(self.domainurl(id), + {'op' : 'vbd', + 'vbd' : vbd }) def xend_domain_device_create(self, id, config): - return xend_call(self.domainurl(id), - {'op' : 'device_create', - 'config' : fileof(config) }) + return self.xendPost(self.domainurl(id), + {'op' : 'device_create', + 'config' : fileof(config) }) def xend_domain_device_destroy(self, id, type, idx): - return xend_call(self.domainurl(id), - {'op' : 'device_destroy', - 'type' : type, - 'index' : idx }) - + return self.xendPost(self.domainurl(id), + {'op' : 'device_destroy', + 'type' : type, + 'index' : idx }) + def xend_consoles(self): - return xend_get(self.consoleurl()) + return self.xendGet(self.consoleurl()) def xend_console(self, id): - return xend_get(self.consoleurl(id)) + return self.xendGet(self.consoleurl(id)) def xend_vnets(self): - return xend_get(self.vneturl()) + return self.xendGet(self.vneturl()) def xend_vnet_create(self, conf): - return xend_call(self.vneturl(), - {'op': 'create', 'config': fileof(conf) }) + return self.xendPost(self.vneturl(), + {'op' : 'create', + 'config' : fileof(conf) }) def xend_vnet(self, id): - return xend_get(self.vneturl(id)) + return self.xendGet(self.vneturl(id)) def xend_vnet_delete(self, id): - return xend_call(self.vneturl(id), - {'op': 'delete' }) + return self.xendPost(self.vneturl(id), + {'op' : 'delete' }) def xend_event_inject(self, sxpr): - val = xend_call(self.eventurl(), - {'op': 'inject', 'event': fileof(sxpr) }) + val = self.xendPost(self.eventurl(), + {'op' : 'inject', + 'event' : fileof(sxpr) }) def xend_dmesg(self): - return xend_get(self.dmesgurl()) + return self.xendGet(self.dmesgurl()) +def synchmain(srv, argv): + xend = Xend(srv=srv) + if len(argv) > 1: + fn = argv[0] + else: + fn = 'xend' + if not fn.startswith('xend'): + fn = 'xend_' + fn + args = argv[1:] + try: + val = getattr(xend, fn)(*args) + PrettyPrint.prettyprint(val) + except XendError, err: + print 'ERROR:', err + sys.exit(1) + +def xendmain(srv, asynch, fn, args): + if asynch: + client = AsynchXendClientProtocol() + else: + client = None + xend = Xend(srv=srv, client=client) + xend.rc = 0 + try: + v = getattr(xend, fn)(*args) + except XendError, err: + print 'ERROR:', err + return 1 + if asynch: + def cbok(val): + PrettyPrint.prettyprint(val) + reactor.stop() + def cberr(err): + print 'ERROR:', err + xend.rc = 1 + reactor.stop() + v.addCallback(cbok) + v.addErrback(cberr) + reactor.run() + return xend.rc + else: + PrettyPrint.prettyprint(v) + return 0 + def main(argv): """Call an API function: @@ -367,18 +570,33 @@ def main(argv): Example: > python XendClient.py domains - (domain 0 8) + (0 8) > python XendClient.py domain 0 (domain (id 0) (name Domain-0) (memory 128)) """ - server = Xend() - fn = argv[1] + global DEBUG + from getopt import getopt + short_options = 'x:ad' + long_options = ['xend=', 'asynch', 'debug'] + (options, args) = getopt(argv[1:], short_options, long_options) + srv = None + asynch = 0 + for k, v in options: + if k in ['-x', '--xend']: + srv = v + elif k in ['-a', '--asynch']: + asynch = 1 + elif k in ['-d', '--DEBUG']: + DEBUG = 1 + if len(args): + fn = args[0] + args = args[1:] + else: + fn = 'xend' + args = [] if not fn.startswith('xend'): fn = 'xend_' + fn - args = argv[2:] - val = getattr(server, fn)(*args) - PrettyPrint.prettyprint(val) - print + sys.exit(xendmain(srv, asynch, fn, args)) if __name__ == "__main__": main(sys.argv) -- 2.30.2